/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.cloud;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.apache.CloudLegacySolrClient;
import org.apache.solr.client.solrj.apache.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.SolrQuery;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.embedded.JettySolrRunner;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecoveryZkTest extends SolrCloudTestCase {

  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  @BeforeClass
  public static void setupCluster() throws Exception {
    cluster = configureCluster(2).addConfig("conf", configset("cloud-minimal")).configure();
  }

  private final List<StoppableIndexingThread> threads = new ArrayList<>();

  @After
  public void stopThreads() throws InterruptedException {
    for (StoppableIndexingThread t : threads) {
      t.safeStop();
    }
    for (StoppableIndexingThread t : threads) {
      t.join();
    }
    threads.clear();
  }

  @Test
  public void test() throws Exception {

    final String collection = "recoverytest";

    CollectionAdminRequest.createCollection(collection, "conf", 1, 2)
        .process(cluster.getSolrClient());
    waitForState(
        "Expected a collection with one shard and two replicas", collection, clusterShape(1, 2));

    // start a couple indexing threads

    int[] maxDocList = new int[] {300, 700, 1200, 1350, 3000};
    int[] maxDocNightlyList = new int[] {3000, 7000, 12000, 30000, 45000, 60000};

    int maxDoc;
    if (!TEST_NIGHTLY) {
      maxDoc = maxDocList[random().nextInt(maxDocList.length - 1)];
    } else {
      maxDoc = maxDocNightlyList[random().nextInt(maxDocList.length - 1)];
    }
    log.info("Indexing {} documents", maxDoc);

    try (SolrClient solrClient =
        cluster.basicSolrClientBuilder().withDefaultCollection(collection).build(); ) {
      final StoppableIndexingThread indexThread =
          new StoppableIndexingThread(null, solrClient, "1", true, maxDoc, 1, true);
      threads.add(indexThread);
      indexThread.start();

      final StoppableIndexingThread indexThread2 =
          new StoppableIndexingThread(null, solrClient, "2", true, maxDoc, 1, true);
      threads.add(indexThread2);
      indexThread2.start();

      // give some time to index...
      int[] waitTimes = new int[] {200, 2000, 3000};
      Thread.sleep(waitTimes[random().nextInt(waitTimes.length - 1)]);

      // bring shard replica down
      DocCollection state = getCollectionState(collection);
      Replica leader = state.getLeader("shard1");
      Replica replica = getRandomReplica(state.getSlice("shard1"), (r) -> !leader.equals(r));

      JettySolrRunner jetty = cluster.getReplicaJetty(replica);
      jetty.stop();

      // wait a moment - lets allow some docs to be indexed so replication time is non 0
      Thread.sleep(waitTimes[random().nextInt(waitTimes.length - 1)]);

      // bring shard replica up
      jetty.start();

      // make sure replication can start
      Thread.sleep(3000);

      // stop indexing threads
      indexThread.safeStop();
      indexThread2.safeStop();

      indexThread.join();
      indexThread2.join();

      new UpdateRequest().commit(solrClient, collection);

      cluster
          .getZkStateReader()
          .waitForState(collection, 120, TimeUnit.SECONDS, clusterShape(1, 2));

      // test that leader and replica have same doc count
      state = getCollectionState(collection);
      assertShardConsistency(state.getSlice("shard1"), true);
    }
  }

  private void assertShardConsistency(Slice shard, boolean expectDocs) throws Exception {
    List<Replica> replicas = shard.getReplicas(r -> r.getState() == Replica.State.ACTIVE);
    long[] numCounts = new long[replicas.size()];
    int i = 0;
    for (Replica replica : replicas) {
      try (var client =
          new HttpSolrClient.Builder(replica.getBaseUrl())
              .withDefaultCollection(replica.getCoreName())
              .withHttpClient(((CloudLegacySolrClient) cluster.getSolrClient()).getHttpClient())
              .build()) {
        numCounts[i] =
            client.query(new SolrQuery("*:*").add("distrib", "false")).getResults().getNumFound();
        i++;
      }
    }
    for (int j = 1; j < replicas.size(); j++) {
      if (numCounts[j] != numCounts[j - 1])
        fail("Mismatch in counts between replicas"); // TODO improve this!
      if (numCounts[j] == 0 && expectDocs)
        fail("Expected docs on shard " + shard.getName() + " but found none");
    }
  }
}
